package com.guzhi.service.upload.pool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.math.NumberUtils;
import org.csource.fastdfs.ClientGlobal;
import org.csource.fastdfs.TrackerClient;
import org.csource.fastdfs.TrackerGroup;
import org.csource.fastdfs.TrackerServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class ConnectionPool {

    private final static Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);

    // busy connection instances
    private ConcurrentHashMap<TrackerServer, Object> busyConnectionPool = null;
    // idle connection instances
    private ArrayBlockingQueue<TrackerServer> idleConnectionPool = null;
    // delay lock for initialization

    @Value("${fastdfs.uri}")
    private String uri;
    // the limit of connection instance
    private final int size = 5;

    private final Object obj = new Object();

    // heart beat
    HeartBeat beat = null;

    public ConnectionPool() {
        new Timer().schedule(new TimerTask() {

            @Override
            public void run() {
                init();
                // 注册心跳
                beat = new HeartBeat(ConnectionPool.this);
                beat.beat();

            }
        }, 3000);
    }

    /**
     * init the connection pool
     * 
     * @param size
     */
    private void init() {
        initClientGlobal();
        busyConnectionPool = new ConcurrentHashMap<TrackerServer, Object>();
        idleConnectionPool = new ArrayBlockingQueue<TrackerServer>(this.size);
        TrackerServer trackerServer = null;
        // TrackerGroup tg = new TrackerGroup(new InetSocketAddress[] { new
        // InetSocketAddress(host, port) });
        // TrackerClient tc = new TrackerClient(tg);
        TrackerClient tc = new TrackerClient();
        try {
            // TrackerClient trackerClient = new TrackerClient();
            for (int i = 0; i < size; i++) {
                trackerServer = tc.getConnection();
                org.csource.fastdfs.ProtoCommon.activeTest(trackerServer.getSocket());
                idleConnectionPool.add(trackerServer);
                LOG.info("trackerServer init: " + trackerServer.getInetSocketAddress().getHostString());
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (trackerServer != null) {
                try {
                    trackerServer.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 1. pop one connection from the idleConnectionPool,
    // 2. push the connection into busyConnectionPool;
    // 3. return the connection
    // 4. if no idle connection, do wait for wait_time seconds, and check again
    public TrackerServer checkout(int waitTimes) throws InterruptedException {
        TrackerServer client1 = idleConnectionPool.poll(waitTimes, TimeUnit.SECONDS);
        if (client1 == null) {
            LOG.warn("ImageServerPool wait time out ,return null");
            throw new NullPointerException("ImageServerPool wait time out ,return null");
        }
        busyConnectionPool.put(client1, obj);
        return client1;
    }

    // 1. pop the connection from busyConnectionPool;
    // 2. push the connection into idleConnectionPool;
    // 3. do nessary cleanup works.
    public void checkin(TrackerServer client1) {
        if (busyConnectionPool.remove(client1) != null) {
            idleConnectionPool.add(client1);
        }
    }

    // so if the connection was broken due to some erros (like
    // : socket init failure, network broken etc), drop this connection
    // from the busyConnectionPool, and init one new connection.
    public synchronized void drop(TrackerServer trackerServer) {
        // first less connection
        // 删除一个无效的连接，如果得到新连建也是无效，则启动detector线程，用于检测什么时候可以正常连接起来
        // 一旦检查成功，将相应属性修改hasConnectionException修改为false，释放先前的连接，并重新建立连接池。
        try {
            trackerServer.close();
        } catch (IOException e1) {
        }
        if (busyConnectionPool.remove(trackerServer) != null) {
            try {
                LOG.warn("ImageServerPool drop a connnection");
                LOG.warn("ImageServerPool size:" + (busyConnectionPool.size() + idleConnectionPool.size()));
                TrackerClient trackerClient = new TrackerClient();
                trackerServer = trackerClient.getConnection();
            } catch (IOException e) {
                trackerServer = null;
                LOG.warn("ImageServerPool getConnection generate exception");
                e.printStackTrace();
            } finally {
                if (!isContinued(trackerServer)) {
                    return;
                }
                // 变成传过数据的
                try {
                    org.csource.fastdfs.ProtoCommon.activeTest(trackerServer.getSocket());
                    idleConnectionPool.add(trackerServer);
                    LOG.warn("ImageServerPool add a connnection");
                    LOG.warn("ImageServerPool size:" + (busyConnectionPool.size() + idleConnectionPool.size()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public boolean isContinued(TrackerServer trackerServer) {
        if (trackerServer == null && hasConnectionException) {
            return false;
        }
        if (trackerServer == null) {
            hasConnectionException = true;
            // only a thread;
            detector();
        }
        if (hasConnectionException) {
            // 代表detector正在运行，就算获得连接，也要等detector做完
            return false;
        }
        return true;
    }

    private void detector() {

        new Thread() {
            @Override
            public void run() {
                String msg = "detector connection failed";
                while (true) {
                    TrackerServer trackerServer = null;
                    TrackerClient trackerClient = new TrackerClient();
                    try {
                        trackerServer = trackerClient.getConnection();
                        Thread.sleep(5000);
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        if (trackerServer != null) {
                            msg = "detector connection success to "
                                    + trackerServer.getInetSocketAddress().getHostString();
                            break;
                        }
                        LOG.warn("current ImageServerPool has size:"
                                + (busyConnectionPool.size() + idleConnectionPool.size()));
                        LOG.warn(msg);
                    }
                }
                LOG.warn(msg);

                if (idleConnectionPool.size() != 0) {
                    LOG.warn("idleConnectionPool start close trackerserver");
                    LOG.warn(msg);

                    for (int i = 0; i < size; i++) {
                        TrackerServer ts = idleConnectionPool.poll();
                        if (ts != null) {
                            try {
                                ts.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
                // re init
                hasConnectionException = false;
                init();
            }
        }.start();
    }

    boolean hasConnectionException = false;

    private void initClientGlobal() {
        String[] hosts = uri.split("[;,]");
        InetSocketAddress[] trackerServers = new InetSocketAddress[hosts.length];
        for (int i = 0; i < hosts.length; i++) {
            String h = hosts[i];
            String[] hs = h.split("[：:]");
            trackerServers[i] = new InetSocketAddress(hs[0], NumberUtils.toInt(hs[1]));
        }
        ClientGlobal.setG_tracker_group(new TrackerGroup(trackerServers));
        // 连接超时的时限，单位为毫秒
        ClientGlobal.setG_connect_timeout(30000);
        // 网络超时的时限，单位为毫秒
        ClientGlobal.setG_network_timeout(60000);
        ClientGlobal.setG_anti_steal_token(false);
        // 字符集
        ClientGlobal.setG_charset("UTF-8");
        ClientGlobal.setG_secret_key(null);
    }

    public ArrayBlockingQueue<TrackerServer> getIdleConnectionPool() {
        return idleConnectionPool;
    }

}
